Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve smart read #51

Merged
merged 13 commits into from
Jun 16, 2023
Merged

Improve smart read #51

merged 13 commits into from
Jun 16, 2023

Conversation

malmans2
Copy link
Contributor

@malmans2 malmans2 commented Jun 15, 2023

This smart_read function is very interesting, a played a little with it today.

Here is an alternative implementation. I removed memory_chunk because it looks a little risky: What happen if you have large chunks? You can get the same behavior by passing da.compute() rather than da.

It would be nice to parametrise a little better the switch that controls the use of vectorised indexing, but I wouldn't know how to do it :)

Even if you don't merge this, I suggest to improve a little the docstring explaining what's going on. Try to do it fast and smartly. is not very helpful, try to explain the idea behind this function.

Here is a quick and dirty benchmark, it would be nice to test with real data, maybe this is just optimised for this specific case:

import oceanspy as ospy
import numpy as np

od = ospy.open_oceandataset.from_catalog("HYCOM")
da = od._ds.Eta

size = 100000
indexes_tuple = tuple(np.random.randint(n, size=100000) for n in (5, 1251, 701))
%timeit old_smart_read(da, indexes_tuple)
%timeit new_smart_read(da, indexes_tuple)
np.testing.assert_equal(
    old_smart_read(da, indexes_tuple), new_smart_read(da, indexes_tuple)
)
275 ms ± 5.98 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
99.9 ms ± 5.74 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
da = da.chunk(100)
assert np.prod(da.data.numblocks) > 100
%timeit old_smart_read(da, indexes_tuple)
%timeit new_smart_read(da, indexes_tuple)
np.testing.assert_equal(
    old_smart_read(da, indexes_tuple), new_smart_read(da, indexes_tuple)
)
3.17 s ± 140 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
3.13 s ± 179 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@malmans2 malmans2 temporarily deployed to pypi June 15, 2023 13:45 — with GitHub Actions Inactive
@codecov
Copy link

codecov bot commented Jun 15, 2023

Codecov Report

Patch coverage: 95.74% and project coverage change: -0.06 ⚠️

Comparison is base (80b8248) 92.86% compared to head (70da3d6) 92.81%.

❗ Current head 70da3d6 differs from pull request most recent head 479c13e. Consider uploading reports for the commit 479c13e to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##             main      #51      +/-   ##
==========================================
- Coverage   92.86%   92.81%   -0.06%     
==========================================
  Files          11       11              
  Lines        2635     2629       -6     
  Branches      718      720       +2     
==========================================
- Hits         2447     2440       -7     
- Misses         84       85       +1     
  Partials      104      104              
Impacted Files Coverage Δ
seaduck/smart_read.py 95.23% <95.12%> (-2.68%) ⬇️
seaduck/eulerian.py 93.95% <100.00%> (ø)

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

@malmans2 malmans2 temporarily deployed to pypi June 15, 2023 13:54 — with GitHub Actions Inactive
@MaceKuailv
Copy link
Owner

Thank you so much! It is certainly cleaner and I will merge it.

I mostly understand what you are doing, but I am still wondering where the speed-up came from. It seems like you are iterating from the first chunk until every single index is found. Wouldn't that make it slower when the indices appear pretty "late" in a large dataset? Yet, it is faster. Why is that?

Yes, I will change the docstring.

@malmans2
Copy link
Contributor Author

I'm not sure I understand. How do we know that we can skip a chunk if we don't look at it?
The first iteration does not read data on disk, it just looks at the indexes to figure out if we can skip or not the chunk.
We only read data when we use compute()

@MaceKuailv
Copy link
Owner

I was using the indexes to bin the particles first. The key step was np.unique. This will return exclusively the ones that has data in it. I set return_inverse = True. I guess I can get rid of that and use your scheme.

To clarify, even though we are not reading data, (index>0).any() could still take some time if both the number of chunks and the number of points interested are both large.

Now that I think about it, the main difference in performance might be because of the overhead from return_inverse = True. Also it might be a good idea to do some benchmarking on using dask. Seems like either when the number of points is small or when the points are very spread-out, dask is more efficient, this is not the case when I developed this (It is also possible that the Oceanography image I had did not have the latest version of xarray/dask).

@malmans2
Copy link
Contributor Author

Good point. Let me check if running a separate loop with unique values is faster.

@malmans2
Copy link
Contributor Author

I tried the code below, but it was a little slower than the old function.
The slowdown is np.unique with axis=1 rather than return_inverse=True.
I don't know enough about this topic, some thoughts:

  1. Maybe you only see the advantage when you have many repeated particles? The slowdown with the test I've done is not too bad, so it could make sense to implement unique because it works well with real data.
  2. Maybe there's no need to use unique because numpy/dask internals already do something similar and better?

It needs some thoughts/benchmarking, so I'll leave to you to decide what's better!

def smart_read(da, indexes_tuple, dask_more_efficient=100):
    if len(indexes_tuple) != da.ndim:
        raise ValueError(
            "indexes_tuple does not match the number of dimensions: "
            f"{len(indexes_tuple)} vs {da.ndim}"
        )

    shape = indexes_tuple[0].shape
    indexes_tuple = tuple(indexes.ravel() for indexes in indexes_tuple)

    if not da.chunks:
        return da.values[indexes_tuple].reshape(shape)
    data = da.data

    unique_indexes, inverse = np.unique(
        np.stack(indexes_tuple, axis=0), axis=1, return_inverse=True
    )

    found_count = 0
    block_dict = {}
    for block_ids in np.ndindex(*data.numblocks):
        shifted_indexes = []
        mask = True
        for block_id, indexes, chunks in zip(block_ids, unique_indexes, data.chunks):
            shifted = indexes - sum(chunks[:block_id])
            block_mask = (shifted >= 0) & (shifted < chunks[block_id])
            if not block_mask.any() or not (mask := mask & block_mask).any():
                break  # empty block
            shifted_indexes.append(shifted)
        else:
            block_dict[block_ids] = (mask, shifted_indexes)
            if len(block_dict) >= dask_more_efficient:
                return data.vindex[indexes_tuple].compute().reshape(shape)

            if (found_count := found_count + mask.sum()) == unique_indexes.shape[1]:
                break  # all blocks found

    values = np.empty(unique_indexes.shape[1])
    for block_ids, (mask, shifted_indexes) in block_dict.items():
        block_values = data.blocks[block_ids].compute()
        values[mask] = block_values[tuple(indexes[mask] for indexes in shifted_indexes)]
    return values[inverse].reshape(shape)

@MaceKuailv
Copy link
Owner

OK, thanks. This is not quite how I used unique, but I guess this part of the code is too opaque. I will take over from here, push to your branch and see what I can do with it.

@malmans2 malmans2 temporarily deployed to pypi June 15, 2023 18:02 — with GitHub Actions Inactive
@malmans2
Copy link
Contributor Author

Sounds good. I pushed just a little optimization, I was storing too much stuff in the dict.

BTW, if you realise that your method was better, feel free to close!
I'm looking at this while doing other stuff, so it's likely that I'm missing something.

@MaceKuailv MaceKuailv temporarily deployed to pypi June 16, 2023 01:24 — with GitHub Actions Inactive
@MaceKuailv MaceKuailv temporarily deployed to pypi June 16, 2023 02:42 — with GitHub Actions Inactive
@MaceKuailv
Copy link
Owner

I have added a mode that try to cutout an array first using the maximum and minimum indexes first, as long as this is not too large. In this way we don't have to cut out multiple chunks one by one.

It is usually faster to cut one large piece than cutting several small pieces

@malmans2
Copy link
Contributor Author

malmans2 commented Jun 16, 2023

Slicing is as good idea, but I think the code was getting a little to messy.
I've pushed an alternative implementation.

I didn't get the goal of dense, could you please explain what are you trying to achieve?

You raised a good issue before, masking could be quite slow or even crash with a lot of particles. I tried dask, it's looking good on my benchmark:

for power in range(8):
    size = 10 ** power
    print(f"{size=:e}")
    indexes_tuple = tuple(np.random.randint(n // 3, (n // 3) * 2, size=size) for n in (10, 1251, 701))
    %timeit old_smart_read(da, indexes_tuple)
    %timeit new_smart_read(da, indexes_tuple)
    print()
size=1.000000e+00
53.9 ms ± 18.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
41.7 ms ± 2.33 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+01
46.1 ms ± 2.64 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
36.6 ms ± 1.36 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+02
48.9 ms ± 1.6 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
42.9 ms ± 2.09 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+03
49.2 ms ± 4.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
38.9 ms ± 2.56 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+04
56.9 ms ± 3.32 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
42.6 ms ± 2.98 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+05
179 ms ± 5.97 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
53.3 ms ± 2.74 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+06
1.96 s ± 143 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
384 ms ± 39.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

size=1.000000e+07
21.7 s ± 532 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
3.26 s ± 164 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Can you try on real data? If that works well, should we make dask a mandatory dependency?

@malmans2
Copy link
Contributor Author

BTW, dask only chunks for size>10.E7, so the example above is not showing the use of dask.
I'm running a better benchmark with size 10.E8, but it's going to take a while.

@MaceKuailv
Copy link
Owner

if not (mask := block_mask if mask is None else mask & block_mask).any(): Wouldn't that just make mask an array after the first chunk?

@malmans2
Copy link
Contributor Author

if not (mask := block_mask if mask is None else mask & block_mask).any(): Wouldn't that just make mask an array after the first chunk?

Yes, I think I had the same before, with an additional check (I used to check both block_mask.any() and (mask & block_mask).any()). The first check could give a little speed speed up in some cases, but it doesn't seem a game changer.

At the end mask must be a a 1D-array, so we either do that, or we store 3 masks separately and we combine at the end.

@MaceKuailv
Copy link
Owner

Did some benchmarking on the sciserver, which is not a great place to do benchmarking. I changed one line.

indexes_tuple = tuple(
        np.random.randint(lw, up, size=size) 
        for lw,up in ((800,810), (0,1251),(0, 701))
    )

to move the indexes further behind.

size=1.000000e+00
34.3 ms ± 767 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+01
117 ms ± 4.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+02
156 ms ± 3.85 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+03
162 ms ± 7.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+04
143 ms ± 1.23 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+05
170 ms ± 5.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

size=1.000000e+06
548 ms ± 11.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

size=1.000000e+07
6.8 s ± 149 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Looks good.

@MaceKuailv MaceKuailv temporarily deployed to pypi June 16, 2023 14:30 — with GitHub Actions Inactive
@MaceKuailv MaceKuailv merged commit 66d9759 into MaceKuailv:main Jun 16, 2023
@malmans2
Copy link
Contributor Author

This is using dask:

size=1.000000e+08
4min 15s ± 3.97 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
1min 23s ± 955 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@MaceKuailv MaceKuailv mentioned this pull request Jun 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants